Security News
Opengrep Emerges as Open Source Alternative Amid Semgrep Licensing Controversy
Opengrep forks Semgrep to preserve open source SAST in response to controversial licensing changes.
The streamx npm package provides a modern, lightweight, and highly performant stream implementation for Node.js. It is designed to be a drop-in replacement for the native Node.js streams with improved performance and consistency. It offers a simpler API and better error handling, and it is compatible with both modern and legacy Node.js versions.
Creating a Readable Stream
This code sample demonstrates how to create a readable stream using streamx that pushes a 'Hello, World!' message and then ends the stream.
const { Readable } = require('streamx');
const readable = new Readable({
read(cb) {
this.push('Hello, World!');
this.push(null); // End the stream
cb();
}
});
readable.on('data', (chunk) => {
console.log(chunk.toString());
});
Creating a Writable Stream
This code sample shows how to create a writable stream using streamx that writes data to the standard output (stdout).
const { Writable } = require('streamx');
const writable = new Writable({
write(data, cb) {
process.stdout.write(data);
cb();
}
});
writable.write('Hello, World!', () => {
console.log('\nWrite completed');
});
Creating a Transform Stream
This code sample illustrates how to create a transform stream using streamx that converts all incoming data to uppercase and then pipes it to the standard output.
const { Transform } = require('streamx');
const transform = new Transform({
transform(data, cb) {
this.push(data.toString().toUpperCase());
cb();
}
});
process.stdin.pipe(transform).pipe(process.stdout);
Piping Streams
This code sample demonstrates how to pipe data from a readable stream to a writable stream using streamx.
const { Readable, Writable } = require('streamx');
const readable = new Readable({
read(cb) {
this.push('Hello, World!');
this.push(null); // End the stream
cb();
}
});
const writable = new Writable({
write(data, cb) {
process.stdout.write(data);
cb();
}
});
readable.pipe(writable);
Through2 is a tiny wrapper around Node.js streams.Transform that makes it easier to create transform streams. It is similar to streamx in that it provides a way to handle stream transformations, but it does not offer the full stream implementation that streamx does.
Highland.js manages synchronous and asynchronous code easily, using nothing more than standard JavaScript and Node-like streams. While it provides similar stream manipulation capabilities, it focuses more on providing a high-level utility layer on top of Node.js streams, unlike streamx which aims to be a drop-in replacement for the core streams API.
Readable-stream is a mirror of the Streams2 and Streams3 implementations in core. It is essentially a user-land copy of the native Node.js stream modules, designed to offer consistent behavior across different Node.js versions. Streamx, on the other hand, aims to improve upon the native streams by offering better performance and a more user-friendly API.
An iteration of the Node.js core streams with a series of improvements.
npm install streamx
Streams have an _open
function that is called before any read/write operation and a _destroy
function that is always run as the last part of the stream.
This makes it easy to maintain state.
Fully integrates a .destroy()
function. When called the stream will wait for any
pending operation to finish and call the stream destroy logic.
Close is always the last event emitted and destroy
is always run.
pipe()
error handlespipe
accepts a callback that is called when the pipeline is fully drained.
It also error handles the streams provided and destroys both streams if either
of them fail.
A map
function can be provided to map your input data into buffers
or other formats. To indicate how much buffer space each data item takes
an byteLength
function can be provided as well.
This removes the need for two modes of streams.
This is a full rewrite, all contained in one file.
Lots of stream methods are simplified based on how I and devs I work with actually use streams in the wild.
streamx aims to be compatible with Node.js streams whenever it is reasonable to do so.
This means that streamx streams behave a lot like Node.js streams from the outside but still provides the improvements above.
streamx has a much smaller footprint when compiled for the browser:
$ for x in stream{,x}; do echo $x: $(browserify -r $x | wc -c) bytes; done
stream: 173844 bytes
streamx: 46943 bytes
With optimizations turned on, the difference is even more stark:
$ for x in stream{,x}; do echo $x: $(browserify -r $x -p tinyify | wc -c) bytes; done
stream: 62649 bytes
streamx: 8460 bytes
$ for x in stream{,x}; do echo $x: $(browserify -r $x -p tinyify | gzip | wc -c) "bytes (gzipped)"; done
stream: 18053 bytes (gzipped)
streamx: 2806 bytes (gzipped)
To make it easier to integrate streams in a async/await
flow, all streams support a signal
option
that accepts a AbortSignal
to as an
alternative means to .destroy
streams.
const { Readable } = require('streamx')
const rs = new Readable({
read (cb) {
this.push('Cool data')
cb(null)
}
})
rs.on('data', data => console.log('data:', data))
This streamx package contains 4 streams similar to Node.js core.
rs = new stream.Readable([options])
Create a new readable stream.
Options include:
{
highWaterMark: 16384, // max buffer size in bytes
map: (data) => data, // optional function to map input data
byteLength: (data) => size, // optional function that calculates the byte size of input data
signal: abortController.signal, // optional AbortSignal that triggers `.destroy` when on `abort`
eagerOpen: false // eagerly open the stream
}
In addition you can pass the open
, read
, and destroy
functions as shorthands in
the constructor instead of overwrite the methods below.
The default byteLength function returns the byte length of buffers and 1024
for any other object. This means the buffer will contain around 16 non buffers
or buffers worth 16kb when full if the defaults are used.
If you set highWaterMark to 0
then all read ahead buffering on the stream
is disabled and it will only call _read
when a user reads rather than ahead of time.
rs._read(cb)
This function is called when the stream wants you to push new data. Overwrite this and add your own read logic. You should call the callback when you are fully done with the read.
Can also be set using options.read
in the constructor.
Note that this function differs from Node.js streams in that it takes the "read finished" callback.
drained = rs.push(data)
Push new data to the stream. Returns true if the buffer is not full and you should push more data if you can.
If you call rs.push(null)
you signal to the stream that no more
data will be pushed and that you want to end the stream.
data = rs.read()
Read a piece of data from the stream buffer. If the buffer is currently empty
null
will be returned and you should wait for readable
to be emitted before
trying again. If the stream has been ended it will also return null
.
Note that this method differs from Node.js streams in that it does not accept an optional amounts of bytes to consume.
rs.unshift(data)
Add a piece of data to the front of the buffer. Use this if you read too much
data using the rs.read()
function.
rs._open(cb)
This function is called once before the first read is issued. Use this function to implement your own open logic.
Can also be set using options.open
in the constructor.
rs._destroy(cb)
This function is called just before the stream is fully destroyed. You should use this to implement whatever teardown logic you need. The final part of the stream life cycle is always to call destroy itself so this function will always be called wheather or not the stream ends gracefully or forcefully.
Can also be set using options.destroy
in the constructor.
Note that the _destroy
might be called without the open function being called
in case no read was ever performed on the stream.
rs._predestroy()
A simple hook that is called as soon as the first stream.destroy()
call is invoked.
Use this in case you need to cancel pending reads (if possible) instead of waiting for them to finish.
Can also be set using options.predestroy
in the constructor.
rs.destroy([error])
Forcefully destroy the stream. Will call _destroy
as soon as all pending reads have finished.
Once the stream is fully destroyed close
will be emitted.
If you pass an error this error will be emitted just before close
is, signifying a reason
as to why this stream was destroyed.
rs.pause()
Pauses the stream. You will only need to call this if you want to pause a resumed stream.
Returns this stream instance.
rs.resume()
Will start reading data from the stream as fast as possible.
If you do not call this, you need to use the read()
method to read data or the pipe()
method to
pipe the stream somewhere else or the data
handler.
If none of these option are used the stream will stay paused.
Returns this stream instance.
bool = Readable.isPaused(rs)
Returns true
if the stream is paused, else false
.
writableStream = rs.pipe(writableStream, [callback])
Efficently pipe the readable stream to a writable stream (can be Node.js core stream or a stream from this package). If you provide a callback the callback is called when the pipeline has fully finished with an optional error in case it failed.
To cancel the pipeline destroy either of the streams.
rs.on('readable')
Emitted when data is pushed to the stream if the buffer was previously empty.
rs.on('data', data)
Emitted when data is being read from the stream. If you attach a data handler you are implicitly resuming the stream.
rs.on('end')
Emitted when the readable stream has ended and no data is left in it's buffer.
rs.on('close')
Emitted when the readable stream has fully closed (i.e. it's destroy function has completed)
rs.on('error', err)
Emitted if any of the stream operations fail with an error. close
is always emitted right after this.
rs.on('piping', dest)
Emitted when the readable stream is pipeing to a destination.
rs.destroyed
Boolean property indicating wheather or not this stream has been destroyed.
bool = Readable.isBackpressured(rs)
Static method to check if a readable stream is currently under backpressure.
stream = Readable.from(arrayOrBufferOrStringOrAsyncIterator)
Static method to turn an array or buffer or string or AsyncIterator into a readable stream.
ws = new stream.Writable([options])
Create a new writable stream.
Options include:
{
highWaterMark: 16384, // max buffer size in bytes
map: (data) => data, // optional function to map input data
byteLength: (data) => size, // optional function that calculates the byte size of input data
signal: abortController.signal // optional AbortSignal that triggers `.destroy` when on `abort`
}
In addition you can pass the open
, write
, final
, and destroy
functions as shorthands in
the constructor instead of overwrite the methods below.
The default byteLength function returns the byte length of buffers and 1024
for any other object. This means the buffer will contain around 16 non buffers
or buffers worth 16kb when full if the defaults are used.
ws._open(cb)
This function is called once before the first write is issued. Use this function to implement your own open logic.
Can also be set using options.open
in the constructor.
ws._destroy(cb)
This function is called just before the stream is fully destroyed. You should use this to implement whatever teardown logic you need. The final part of the stream life cycle is always to call destroy itself so this function will always be called wheather or not the stream ends gracefully or forcefully.
Can also be set using options.destroy
in the constructor.
Note that the _destroy
might be called without the open function being called
in case no write was ever performed on the stream.
ws._predestroy()
A simple hook that is called as soon as the first stream.destroy()
call is invoked.
Use this in case you need to cancel pending writes (if possible) instead of waiting for them to finish.
Can also be set using options.predestroy
in the constructor.
ws.destroy([error])
Forcefully destroy the stream. Will call _destroy
as soon as all pending reads have finished.
Once the stream is fully destroyed close
will be emitted.
If you pass an error this error will be emitted just before close
is, signifying a reason
as to why this stream was destroyed.
drained = ws.write(data)
Write a piece of data to the stream. Returns true
if the stream buffer is not full and you
should keep writing to it if you can. If false
is returned the stream will emit drain
once it's buffer is fully drained.
ws._write(data, callback)
This function is called when the stream want to write some data. Use this to implement your own write logic. When done call the callback and the stream will call it again if more data exists in the buffer.
Can also be set using options.write
in the constructor.
ws._writev(batch, callback)
Similar to _write
but passes an array of all data in the current write buffer instead of the oldest one.
Useful if the destination you are writing the data to supports batching.
Can also be set using options.writev
in the constructor.
ws.end()
Gracefully end the writable stream. Call this when you no longer want to write to the stream.
Once all writes have been fully drained finish
will be emitted.
Returns this stream instance.
ws._final(callback)
This function is called just before finish
is emitted, i.e. when all writes have flushed but ws.end()
have been called. Use this to implement any logic that should happen after all writes but before finish.
Can also be set using options.final
in the constructor.
ws.on('finish')
Emitted when the stream has been ended and all writes have been drained.
ws.on('close')
Emitted when the readable stream has fully closed (i.e. it's destroy function has completed)
ws.on('error', err)
Emitted if any of the stream operations fail with an error. close
is always emitted right after this.
ws.on('pipe', src)
Emitted when a readable stream is being piped to the writable one.
ws.destroyed
Boolean property indicating wheather or not this stream has been destroyed.
bool = Writable.isBackpressured(ws)
Static method to check if a writable stream is currently under backpressure.
bool = await Writable.drained(ws)
Static helper to wait for a stream to drain the currently queued writes. Returns true if they were drained and false otherwise if the stream was destroyed.
s = new stream.Duplex([options])
A duplex stream is a stream that is both readable and writable.
Since JS does not support multiple inheritance it inherits directly from Readable but implements the Writable API as well.
If you want to provide only a map function for the readable side use mapReadable
instead.
If you want to provide only a byteLength function for the readable side use byteLengthReadable
instead.
Same goes for the writable side but using mapWritable
and byteLengthWritable
instead.
A Transform stream is a duplex stream with an ._transform
template method that allows to
asynchronously map the input to a different output.
The transform stream overrides the _write
and _read
operations of Readable
and Writable
but
still allows the setting of these options in the constructor. Usually it is unnecessary to pass
in read
or write
/writev
or to override the corresponding ._read
, ._write
or ._writev
operation.
ts = new stream.Transform([options])
A transform stream is a duplex stream that maps the data written to it and emits that as readable data.
Has the same options as a duplex stream except you can provide a transform
function also.
ts._transform(data, callback)
Transform the incoming data. Call callback(null, mappedData)
or use ts.push(mappedData)
to
return data to the readable side of the stream.
Per default the transform function just remits the incoming data making it act as a pass-through stream.
pipeline
allows to stream form a readable through a set of duplex streams to a writable entry.
const { pipeline, Readable, Transform, Writable } = require('streamx')
const lastStream = pipeline(
Readable.from([1, 2, 3]),
new Transform({
transform (from, cb) {
this.push(from.toString())
cb()
}
}),
new Writable({
write (data, cb) {
console.log(data)
cb()
}
})
error => {
// Callback once write has finished
}
)
lastStream = stream.pipeline(...streams, [done])
Pipe all streams together and return the last stream piped to. When the last stream finishes the pipeline ended succesfully.
If any of the streams error, whether they are Node.js core streams or streamx streams, all streams in the pipeline are shutdown.
Optionally you can pass a done callback to know when the pipeline is done.
promise = stream.pipelinePromise(...streams)
Same as normal pipeline except instead of returning the last stream it returns a promise representing the done callback. Note you should error handle this promise if you use this version.
bool = isStream(stream)
bool = isStreamx(stream)
err = getStreamError(stream, [options])
Returns null
if the stream has no errors.
Streamx aims to be minimal and stable. It therefore only contains a minimal set of utilities. To help discover of other modules that help you build streamx apps, we link some useful utilities here
stream.compose
and the duplexify
and pumpify
modules.If you want to help contribute to streamx a good way to start is to help writing more test cases, compatibility tests, documentation, or performance benchmarks.
If in doubt open an issue :)
MIT
FAQs
An iteration of the Node.js core streams with a series of improvements
The npm package streamx receives a total of 7,608,944 weekly downloads. As such, streamx popularity was classified as popular.
We found that streamx demonstrated a healthy version release cadence and project activity because the last version was released less than a year ago. It has 1 open source maintainer collaborating on the project.
Did you know?
Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.
Security News
Opengrep forks Semgrep to preserve open source SAST in response to controversial licensing changes.
Security News
Critics call the Node.js EOL CVE a misuse of the system, sparking debate over CVE standards and the growing noise in vulnerability databases.
Security News
cURL and Go security teams are publicly rejecting CVSS as flawed for assessing vulnerabilities and are calling for more accurate, context-aware approaches.